package com.xiaofan

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector


object OrderTimeoutWithoutCep {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
    val orderEventStream: DataStream[OrderEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)


    // 自定义ProcessFunction进行复杂事件的检测
    val resultStream: DataStream[OrderResult] = orderEventStream
      .keyBy(_.orderId)
      .process(new OrderPayMatchResult())

    resultStream.print("result")
    resultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout")


    env.execute("order time job")
  }
}


/**
 * 自定义实现KeyedProcessFunction
 */
class OrderPayMatchResult() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {

  // 定义状态，标识位标识create、pay是否已经来过，定时器时间戳
  lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isCreated", classOf[Boolean]))
  lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isPayed", classOf[Boolean]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))
  // 定义测输出流标签
  val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout")


  override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
    // 先拿到当前状态
    val isPayed = isPayedState.value()
    val isCreated = isCreatedState.value()
    val timerTs = timerTsState.value()

    // 判断当前时间类型，看是create还是pay
    if (value.eventType == "create") {
      if (isPayed) {
        out.collect(OrderResult(value.orderId, "payed successfully..."))
        ctx.timerService().deleteEventTimeTimer(timerTs)
        timerTsState.clear()
        isPayedState.clear()
        isCreatedState.clear()
      } else {
        val ts = value.timestamp * 1000L + 900 * 1000L
        ctx.timerService().registerEventTimeTimer(ts)

        timerTsState.update(ts)
        isCreatedState.update(true)
      }
    } else if (value.eventType == "pay") {
      if (isCreated) {
        // 如果已经create过，匹配成功，但是还要判断一下pay时间是否超过了定时器时间
        if (value.timestamp * 1000L < timerTs) {
          // 没有超时，正常输出
          out.collect(OrderResult(value.orderId, "payed successfully"))
        } else {
          // 已经超时，输出超时
          ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
        }

        // 只要输出结果，当前order处理已经结束，清空状态和定时器
        ctx.timerService().deleteEventTimeTimer(timerTs)
        timerTsState.clear()
        isPayedState.clear()
        isCreatedState.clear()
      } else {
        ctx.timerService().registerEventTimeTimer(value.timestamp * 1000L)

        timerTsState.update(value.timestamp * 1000L)
        isPayedState.update(true)
      }
    }

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
    // 1.pay来了，没有create
    if (isPayedState.value()) {
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but not found create log"))
    } else {
      // 2. create来了，没有pay
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
    }

    timerTsState.clear()
    isPayedState.clear()
    isCreatedState.clear()
  }
}











































